1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 module thrift.async.socket; 20 import core.stdc.stdint; 21 import core.thread : Fiber; 22 import core.time : dur, Duration; 23 import std.array : empty; 24 import std.conv : to; 25 import std.exception : enforce; 26 import std.socket; 27 import thrift.base; 28 import thrift.async.base; 29 import thrift.transport.base; 30 import thrift.transport.socket : TSocketBase; 31 import thrift.internal.endian; 32 import thrift.internal.socket; 33 34 version (Windows) { 35 import core.sys.windows.winsock2 : connect; 36 } else version (Posix) { 37 import core.sys.posix.sys.socket : connect; 38 } else static assert(0, "Don't know connect on this platform."); 39 40 /** 41 * Non-blocking socket implementation of the TTransport interface. 42 * 43 * Whenever a socket operation would block, TAsyncSocket registers a callback 44 * with the specified TAsyncSocketManager and yields. 45 * 46 * As for thrift.transport.socket, due to the limitations of std.socket, 47 * currently only TCP/IP sockets are supported (i.e. Unix domain sockets are 48 * not). 49 */ 50 class TAsyncSocket : TSocketBase, TAsyncTransport { 51 /** 52 * Constructor that takes an already created, connected (!) socket. 53 * 54 * Params: 55 * asyncManager = The TAsyncSocketManager to use for non-blocking I/O. 56 * socket = Already created, connected socket object. Will be switched to 57 * non-blocking mode if it isn't already. 58 */ 59 this(TAsyncSocketManager asyncManager, Socket socket) { 60 asyncManager_ = asyncManager; 61 socket.blocking = false; 62 super(socket); 63 } 64 65 /** 66 * Creates a new unconnected socket that will connect to the given host 67 * on the given port. 68 * 69 * Params: 70 * asyncManager = The TAsyncSocketManager to use for non-blocking I/O. 71 * host = Remote host. 72 * port = Remote port. 73 */ 74 this(TAsyncSocketManager asyncManager, string host, ushort port) { 75 asyncManager_ = asyncManager; 76 super(host, port); 77 } 78 79 override TAsyncManager asyncManager() @property { 80 return asyncManager_; 81 } 82 83 /** 84 * Asynchronously connects the socket. 85 * 86 * Completes without blocking and defers further operations on the socket 87 * until the connection is established. If connecting fails, this is 88 * currently not indicated in any way other than every call to read/write 89 * failing. 90 */ 91 override void open() { 92 if (isOpen) return; 93 94 enforce(!host_.empty, new TTransportException( 95 "Cannot open null host.", TTransportException.Type.NOT_OPEN)); 96 enforce(port_ != 0, new TTransportException( 97 "Cannot open with null port.", TTransportException.Type.NOT_OPEN)); 98 99 100 // Cannot use std.socket.Socket.connect here because it hides away 101 // EINPROGRESS/WSAWOULDBLOCK. 102 Address addr; 103 try { 104 // Currently, we just go with the first address returned, could be made 105 // more intelligent though – IPv6? 106 addr = getAddress(host_, port_)[0]; 107 } catch (Exception e) { 108 throw new TTransportException(`Unable to resolve host "` ~ host_ ~ `".`, 109 TTransportException.Type.NOT_OPEN, __FILE__, __LINE__, e); 110 } 111 112 socket_ = new TcpSocket(addr.addressFamily); 113 socket_.blocking = false; 114 setSocketOpts(); 115 116 auto errorCode = connect(socket_.handle, addr.name(), addr.nameLen()); 117 if (errorCode == 0) { 118 // If the connection could be established immediately, just return. I 119 // don't know if this ever happens. 120 return; 121 } 122 123 auto errno = getSocketErrno(); 124 if (errno != CONNECT_INPROGRESS_ERRNO) { 125 throw new TTransportException(`Could not establish connection to "` ~ 126 host_ ~ `": ` ~ socketErrnoString(errno), 127 TTransportException.Type.NOT_OPEN); 128 } 129 130 // This is the expected case: connect() signalled that the connection 131 // is being established in the background. Queue up a work item with the 132 // async manager which just defers any other operations on this 133 // TAsyncSocket instance until the socket is ready. 134 asyncManager_.execute(this, 135 { 136 auto fiber = Fiber.getThis(); 137 TAsyncEventReason reason = void; 138 asyncManager_.addOneshotListener(socket_, TAsyncEventType.WRITE, 139 connectTimeout, 140 scopedDelegate((TAsyncEventReason r){ reason = r; fiber.call(); }) 141 ); 142 Fiber.yield(); 143 144 if (reason == TAsyncEventReason.TIMED_OUT) { 145 // Close the connection, so that subsequent work items fail immediately. 146 closeImmediately(); 147 return; 148 } 149 150 int32_t errorCode = void; 151 socket_.getOption(SocketOptionLevel.SOCKET, cast(SocketOption)SO_ERROR, errorCode); 152 153 if (errorCode) { 154 logInfo("Could not connect TAsyncSocket: %s", 155 socketErrnoString(errorCode)); 156 157 // Close the connection, so that subsequent work items fail immediately. 158 closeImmediately(); 159 return; 160 } 161 162 } 163 ); 164 } 165 166 /** 167 * Closes the socket. 168 * 169 * Will block until all currently active operations are finished before the 170 * socket is closed. 171 */ 172 override void close() { 173 if (!isOpen) return; 174 175 import core.sync.condition; 176 import core.sync.mutex; 177 178 auto doneMutex = new Mutex; 179 auto doneCond = new Condition(doneMutex); 180 synchronized (doneMutex) { 181 asyncManager_.execute(this, 182 scopedDelegate( 183 { 184 closeImmediately(); 185 synchronized (doneMutex) doneCond.notifyAll(); 186 } 187 ) 188 ); 189 doneCond.wait(); 190 } 191 } 192 193 override bool peek() { 194 if (!isOpen) return false; 195 196 ubyte buf; 197 auto r = socket_.receive((&buf)[0..1], SocketFlags.PEEK); 198 if (r == Socket.ERROR) { 199 auto lastErrno = getSocketErrno(); 200 static if (connresetOnPeerShutdown) { 201 if (lastErrno == ECONNRESET) { 202 closeImmediately(); 203 return false; 204 } 205 } 206 throw new TTransportException("Peeking into socket failed: " ~ 207 socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN); 208 } 209 return (r > 0); 210 } 211 212 override size_t read(ubyte[] buf) { 213 enforce(isOpen, new TTransportException( 214 "Cannot read if socket is not open.", TTransportException.Type.NOT_OPEN)); 215 216 typeof(getSocketErrno()) lastErrno; 217 218 auto r = yieldOnBlock(socket_.receive(cast(void[])buf), 219 TAsyncEventType.READ); 220 221 // If recv went fine, immediately return. 222 if (r >= 0) return r; 223 224 // Something went wrong, find out how to handle it. 225 lastErrno = getSocketErrno(); 226 227 static if (connresetOnPeerShutdown) { 228 // See top comment. 229 if (lastErrno == ECONNRESET) { 230 return 0; 231 } 232 } 233 234 throw new TTransportException("Receiving from socket failed: " ~ 235 socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN); 236 } 237 238 override void write(in ubyte[] buf) { 239 size_t sent; 240 while (sent < buf.length) { 241 sent += writeSome(buf[sent .. $]); 242 } 243 assert(sent == buf.length); 244 } 245 246 override size_t writeSome(in ubyte[] buf) { 247 enforce(isOpen, new TTransportException( 248 "Cannot write if socket is not open.", TTransportException.Type.NOT_OPEN)); 249 250 auto r = yieldOnBlock(socket_.send(buf), TAsyncEventType.WRITE); 251 252 // Everything went well, just return the number of bytes written. 253 if (r > 0) return r; 254 255 // Handle error conditions. 256 if (r < 0) { 257 auto lastErrno = getSocketErrno(); 258 259 auto type = TTransportException.Type.UNKNOWN; 260 if (isSocketCloseErrno(lastErrno)) { 261 type = TTransportException.Type.NOT_OPEN; 262 closeImmediately(); 263 } 264 265 throw new TTransportException("Sending to socket failed: " ~ 266 socketErrnoString(lastErrno), type); 267 } 268 269 // send() should never return 0. 270 throw new TTransportException("Sending to socket failed (0 bytes written).", 271 TTransportException.Type.UNKNOWN); 272 } 273 274 /// The amount of time in which a conncetion must be established before the 275 /// open() call times out. 276 Duration connectTimeout = dur!"seconds"(5); 277 278 private: 279 void closeImmediately() { 280 socket_.close(); 281 socket_ = null; 282 } 283 284 T yieldOnBlock(T)(lazy T call, TAsyncEventType eventType) { 285 while (true) { 286 auto result = call(); 287 if (result != Socket.ERROR || getSocketErrno() != WOULD_BLOCK_ERRNO) return result; 288 289 // We got an EAGAIN result, register a callback to return here once some 290 // event happens and yield. 291 292 Duration timeout = void; 293 final switch (eventType) { 294 case TAsyncEventType.READ: 295 timeout = recvTimeout_; 296 break; 297 case TAsyncEventType.WRITE: 298 timeout = sendTimeout_; 299 break; 300 } 301 302 auto fiber = Fiber.getThis(); 303 assert(fiber, "Current fiber null – not running in TAsyncManager?"); 304 TAsyncEventReason eventReason = void; 305 asyncManager_.addOneshotListener(socket_, eventType, timeout, 306 scopedDelegate((TAsyncEventReason reason) { 307 eventReason = reason; 308 fiber.call(); 309 }) 310 ); 311 312 // Yields execution back to the async manager, will return back here once 313 // the above listener is called. 314 Fiber.yield(); 315 316 if (eventReason == TAsyncEventReason.TIMED_OUT) { 317 // If we are cancelling the request due to a timed out operation, the 318 // connection is in an undefined state, because the server could decide 319 // to send the requested data later, or we could have already been half- 320 // way into writing a request. Thus, we close the connection to make any 321 // possibly queued up work items fail immediately. Besides, the server 322 // is not very likely to immediately recover after a socket-level 323 // timeout has expired anyway. 324 closeImmediately(); 325 326 throw new TTransportException("Timed out while waiting for socket " ~ 327 "to get ready to " ~ to!string(eventType) ~ ".", 328 TTransportException.Type.TIMED_OUT); 329 } 330 } 331 } 332 333 /// The TAsyncSocketManager to use for non-blocking I/O. 334 TAsyncSocketManager asyncManager_; 335 } 336 337 private { 338 // std.socket doesn't include SO_ERROR for reasons unknown. 339 version (linux) { 340 enum SO_ERROR = 4; 341 } else version (OSX) { 342 enum SO_ERROR = 0x1007; 343 } else version (FreeBSD) { 344 enum SO_ERROR = 0x1007; 345 } else version (Windows) { 346 import core.sys.windows.winsock2 : SO_ERROR; 347 } else static assert(false, "Don't know SO_ERROR on this platform."); 348 349 // This hack forces a delegate literal to be scoped, even if it is passed to 350 // a function accepting normal delegates as well. DMD likes to allocate the 351 // context on the heap anyway, but it seems to work for LDC. 352 import std.traits : isDelegate; 353 auto scopedDelegate(D)(scope D d) if (isDelegate!D) { 354 return d; 355 } 356 }